package tom.reactor.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
import tom.reactor.model.MsgEvent;
import tom.reactor.service.MsgEventService;

import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;

/**
 * @author ZHUFEIFEI
 */
@RestController
@RequestMapping("/event")
public class MsgEventController {

    @Autowired
    private MsgEventService service;

    /**
     * consumes接收stream类型的数据，一直接收
     * @param events
     * @return
     */
    @PostMapping(consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Mono<Void> receiveEvent(@RequestBody Flux<MsgEvent> events) {
            return this.service.saveEvents(events);
    }

    /**
     * produces返回stream类型数据, 源源不断的返回
     * @return
     */
    @GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<MsgEvent> publishEvent() {
        return this.service.findEvents();
    }

    /**
     * ServerSentEvent 等效 produces = MediaType.APPLICATION_STREAM_TEXT_VALUE
     * @return
     */
    @GetMapping("/rand")
    public Flux<ServerSentEvent<Integer>> rand() {
        //每秒钟产生一个数，从0开始递增，数值转化为元组，元组转化为ServerSentEvent
        return Flux.interval(Duration.ofSeconds(1L))
                //Tuples元组，第一个参数是元组第一个元素，第二个参数是元组第二个元素
                .map(seq -> Tuples.of(seq, ThreadLocalRandom.current().nextInt()))
                //得到元组data并转化为serverSentEvent类型
                .map(data -> ServerSentEvent.<Integer>builder()
                        .id(data.getT1().toString())
                        .comment("random")
                        .data(data.getT2())
                        .build());
    }

}
